热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

利用RabbitMQ实现高效延迟任务处理

本文详细探讨了如何利用RabbitMQ实现延迟任务,包括其应用场景、实现原理、系统设计以及具体的SpringBoot实现方式。

前言:本文旨在深入解析基于RabbitMQ的消息队列技术实现延迟任务的方法,涵盖从基础概念到实际应用的全过程,适用于对消息队列及延迟任务感兴趣的开发人员和技术爱好者。


一、延迟任务概述

延迟任务在现代互联网应用中十分常见,如订单超时自动取消、支付回调重试等。这些场景的特点是在未来某一时刻执行特定操作,并且通常只需执行一次。对于订单超时取消这类任务,由于其幂等性,不必担心重复消费的问题;而对于支付回调重试,则需要特别注意避免重复处理同一笔交易。


1、工作原理

在RabbitMQ中实现延迟任务,生产者将带有延迟属性的消息发送至指定的交换机。该交换机会根据设定的延迟时间将消息转发给相应的队列,消费者则通过监听队列来获取并处理这些消息。值得注意的是,为了保证系统的高可用性和稳定性,必须采取措施确保消息在交换机中的持久化存储,以防因服务中断导致未完成的任务丢失。


2、技术选型


二、系统设计方案


(一)环境配置

为支持延迟消息功能,需在RabbitMQ服务器上安装插件。此插件允许开发者指定消息的延迟时间,从而实现精准的定时任务调度。


(二)生产者实现

生产者的职责是确保消息能够准确无误地发送到RabbitMQ。为此,我们采用confirm确认机制,即消息发送后等待RabbitMQ的确认回复,以验证消息是否成功到达。例如,在创建订单的过程中,首先将订单信息保存到数据库和Redis缓存中,随后发送带有延迟属性的消息至RabbitMQ。一旦接收到确认响应,便从Redis中移除相关记录;若未能成功发送,则重新尝试直至成功。


(三)消费者实现

消费者负责接收并处理来自队列的消息,关键在于确保消息不会丢失且能正确处理。这包括手动确认每条消息的接收状态,保持消费者的长期在线,以及为可能出现的错误提供重试机制。特别是对于那些幂等性的操作,如订单取消,无需额外处理重复消费的情况。


三、Spring Boot实践示例

以下是使用Spring Boot框架实现延迟任务的部分核心代码,完整项目可在GitHub上找到。


(一)生产者代码片段

在处理订单时,首先确保订单数据的安全存储,然后利用RabbitMQ发送带有延迟属性的消息。

for (long i = 1; i <= 10; i++) {
// 模拟生成订单
BuOrder order = createOrder(i);
// 订单入库
orderService.saveOrUpdate(order);
// 将订单存入Redis
RedisUtils.setObject(RabbitTemplateConfig.ORDER_PREFIX + i, order);
// 向RabbitMQ异步投递消息
rabbitTemplate.convertAndSend(RabbitmqConfig.DELAY_EXCHANGE_NAME, RabbitmqConfig.DELAY_KEY, order, RabbitUtils.setDelay(30000), RabbitUtils.correlationData(order.getOrderId()));
}

生产者通过确认机制确保消息的可靠传递:

public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (correlatiOnData== null) return;
String key = RabbitTemplateConfig.ORDER_PREFIX + correlationData.getId();
if (ack) {
// 消息成功投递,删除Redis中订单数据
RedisUtils.deleteObject(key);
} else {
// 从Redis中读取订单数据,重新投递
BuOrder order = RedisUtils.getObject(key, BuOrder.class);
rabbitTemplate.convertAndSend(RabbitmqConfig.DELAY_EXCHANGE_NAME, RabbitmqConfig.DELAY_KEY, order, RabbitUtils.setDelay(30000), RabbitUtils.correlationData(order.getOrderId()));
}
}


(二)消费者代码片段

消费者端通过手动确认消息的消费,确保消息处理的可靠性,并具备自动重试功能。

@RabbitListener(queues = RabbitmqConfig.DELAY_QUEUE_NAME)
public void consumeNode01(Channel channel, Message message, BuOrder order) throws IOException {
if (order.getOrderStatus() == 0) {
// 更新订单状态为已关闭
orderService.updateById(new BuOrder(order.getOrderId(), -1));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
log.info(String.format("消费者节点01消费编号为【%s】的消息", order.getOrderId()));
}
}

建议部署多个消费者实例,以提高消息处理效率,减少队列积压。


(三)辅助工具类

文中提及的RabbitUtils工具类包含了一些与RabbitMQ交互的基础方法,该类位于以下Maven依赖中:


xin.altitude.cms
ucode-cms-common
1.4.3.1


推荐阅读
  • UNP 第9章:主机名与地址转换
    本章探讨了用于在主机名和数值地址之间进行转换的函数,如gethostbyname和gethostbyaddr。此外,还介绍了getservbyname和getservbyport函数,用于在服务器名和端口号之间进行转换。 ... [详细]
  • 本文将介绍如何编写一些有趣的VBScript脚本,这些脚本可以在朋友之间进行无害的恶作剧。通过简单的代码示例,帮助您了解VBScript的基本语法和功能。 ... [详细]
  • Explore a common issue encountered when implementing an OAuth 1.0a API, specifically the inability to encode null objects and how to resolve it. ... [详细]
  • 1:有如下一段程序:packagea.b.c;publicclassTest{privatestaticinti0;publicintgetNext(){return ... [详细]
  • 数据库内核开发入门 | 搭建研发环境的初步指南
    本课程将带你从零开始,逐步掌握数据库内核开发的基础知识和实践技能,重点介绍如何搭建OceanBase的开发环境。 ... [详细]
  • 本文详细介绍了如何使用 Yii2 的 GridView 组件在列表页面实现数据的直接编辑功能。通过具体的代码示例和步骤,帮助开发者快速掌握这一实用技巧。 ... [详细]
  • 本文详细介绍了Java编程语言中的核心概念和常见面试问题,包括集合类、数据结构、线程处理、Java虚拟机(JVM)、HTTP协议以及Git操作等方面的内容。通过深入分析每个主题,帮助读者更好地理解Java的关键特性和最佳实践。 ... [详细]
  • RecyclerView初步学习(一)
    RecyclerView初步学习(一)ReCyclerView提供了一种插件式的编程模式,除了提供ViewHolder缓存模式,还可以自定义动画,分割符,布局样式,相比于传统的ListVi ... [详细]
  • golang常用库:配置文件解析库/管理工具viper使用
    golang常用库:配置文件解析库管理工具-viper使用-一、viper简介viper配置管理解析库,是由大神SteveFrancia开发,他在google领导着golang的 ... [详细]
  • 本文详细介绍了Java中org.neo4j.helpers.collection.Iterators.single()方法的功能、使用场景及代码示例,帮助开发者更好地理解和应用该方法。 ... [详细]
  • 优化ListView性能
    本文深入探讨了如何通过多种技术手段优化ListView的性能,包括视图复用、ViewHolder模式、分批加载数据、图片优化及内存管理等。这些方法能够显著提升应用的响应速度和用户体验。 ... [详细]
  • 本文详细介绍了 GWT 中 PopupPanel 类的 onKeyDownPreview 方法,提供了多个代码示例及应用场景,帮助开发者更好地理解和使用该方法。 ... [详细]
  • 深入理解 SQL 视图、存储过程与事务
    本文详细介绍了SQL中的视图、存储过程和事务的概念及应用。视图为用户提供了一种灵活的数据查询方式,存储过程则封装了复杂的SQL逻辑,而事务确保了数据库操作的完整性和一致性。 ... [详细]
  • 本文详细介绍了Java中org.eclipse.ui.forms.widgets.ExpandableComposite类的addExpansionListener()方法,并提供了多个实际代码示例,帮助开发者更好地理解和使用该方法。这些示例来源于多个知名开源项目,具有很高的参考价值。 ... [详细]
  • 本文深入探讨了Linux系统中网卡绑定(bonding)的七种工作模式。网卡绑定技术通过将多个物理网卡组合成一个逻辑网卡,实现网络冗余、带宽聚合和负载均衡,在生产环境中广泛应用。文章详细介绍了每种模式的特点、适用场景及配置方法。 ... [详细]
author-avatar
歌手王紫璇
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有